home *** CD-ROM | disk | FTP | other *** search
- # Miro - an RSS based video player application
- # Copyright (C) 2005-2007 Participatory Culture Foundation
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program; if not, write to the Free Software
- # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-
- """Event loop handler.
-
- This module handles the democracy event loop which is responsible for network
- requests and scheduling.
-
- TODO:
- handle user setting clock back
- """
-
- import threading
- import socket
- import errno
- import select
- import heapq
- import Queue
- import util
- import database
- import logging
-
- from clock import clock
-
- import util
-
- cumulative = {}
-
- class DelayedCall(object):
- def __init__(self, function, name, args, kwargs):
- self.function = function
- self.name = name
- self.args = args
- self.kwargs = kwargs
- self.canceled = False
-
- def _unlink(self):
- """Removes the references that this object has to the outside world,
- this eases the GC's work in finding cycles and fixes some memory leaks
- on windows.
- """
- self.function = self.args = self.kwargs = None
-
- def cancel(self):
- self.canceled = True
- self._unlink()
-
- def dispatch(self):
- if not self.canceled:
- when = "While handling %s" % self.name
- start = clock()
- util.trapCall(when, self.function, *self.args, **self.kwargs)
- end = clock()
- if end-start > 0.5:
- logging.timing ("%s too slow (%.3f secs)",
- self.name, end-start)
- try:
- total = cumulative[self.name]
- except KeyboardInterrupt:
- raise
- except:
- total = 0
- total += end - start
- cumulative[self.name] = total
- if total > 5.0:
- logging.timing ("%s cumulative is too slow (%.3f secs)",
- self.name, total)
- cumulative[self.name] = 0
- self._unlink()
-
- class Scheduler(object):
- def __init__(self):
- self.heap = []
-
- def addTimeout(self, delay, function, name, args=None, kwargs=None):
- if args is None:
- args = ()
- if kwargs is None:
- kwargs = {}
- scheduledTime = clock() + delay
- dc = DelayedCall(function, "timeout (%s)" % (name,), args, kwargs)
- heapq.heappush(self.heap, (scheduledTime, dc))
- return dc
-
- def nextTimeout(self):
- if len(self.heap) == 0:
- return None
- else:
- return max(0, self.heap[0][0] - clock())
-
- def hasPendingTimeout(self):
- return len(self.heap) > 0 and self.heap[0][0] < clock()
-
- def processNextTimeout(self):
- time, dc = heapq.heappop(self.heap)
- dc.dispatch()
-
- def processTimeouts(self):
- while self.hasPendingTimeout():
- self.processNextTimeout()
-
- class CallQueue(object):
- def __init__(self):
- self.queue = Queue.Queue()
-
- def addIdle(self, function, name, args=None, kwargs=None):
- if args is None:
- args = ()
- if kwargs is None:
- kwargs = {}
- dc = DelayedCall (function, "idle (%s)" % (name,), args, kwargs)
- self.queue.put (dc)
- #self.queue.put((dc, clock()))
- return dc
-
- def processNextIdle(self):
- dc = self.queue.get()
- #dc, requested = self.queue.get()
- #start = clock()
- dc.dispatch()
- #if start - requested > 1.0:
- # print "WARNING: %s took too long to fire (%.3f secs)" % (
- # dc.name, start - requested)
-
- def hasPendingIdle(self):
- return not self.queue.empty()
-
- def processIdles(self):
- while self.hasPendingIdle():
- self.processNextIdle()
-
- class ThreadPool(object):
- """The thread pool is used to handle calls like gethostbyname() that block
- and there's no asynchronous workaround. What we do instead is call them
- in a separate thread and return the result in a callback that executes in
- the event loop.
- """
- THREADS = 3
-
- def __init__(self, eventLoop):
- self.eventLoop = eventLoop
- self.queue = Queue.Queue()
- self.threads = []
-
- def initThreads(self):
- while len(self.threads) < ThreadPool.THREADS:
- t = threading.Thread(name='ThreadPool - %d' % len(self.threads),
- target=self.threadLoop)
- t.setDaemon(True)
- t.start()
- self.threads.append(t)
-
- def threadLoop(self):
- while True:
- nextItem = self.queue.get()
- if nextItem == "QUIT":
- break
- else:
- callback, errback, func, name, args, kwargs, = nextItem
- try:
- result = func(*args, **kwargs)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- func = errback
- name = 'Thread Pool Errback (%s)' % name
- args = (e,)
- else:
- func = callback
- name = 'Thread Pool Callback (%s)' % name
- args = (result,)
- if not self.eventLoop.quitFlag:
- self.eventLoop.idleQueue.addIdle(func, name, args=args)
- self.eventLoop.wakeup()
-
- def queueCall(self, callback, errback, function, name, *args, **kwargs):
- self.queue.put((callback, errback, function, name, args, kwargs))
-
- def closeThreads(self):
- for x in xrange(len(self.threads)):
- self.queue.put("QUIT")
- while len(self.threads) > 0:
- x = self.threads.pop()
- try:
- x.join()
- except:
- pass
-
- class EventLoop(object):
- def __init__(self):
- self.scheduler = Scheduler()
- self.idleQueue = CallQueue()
- self.urgentQueue = CallQueue()
- self.threadPool = ThreadPool(self)
- self.readCallbacks = {}
- self.writeCallbacks = {}
- self.wakeSender, self.wakeReceiver = util.makeDummySocketPair()
- self.addReadCallback(self.wakeReceiver, self._slurpWakerData)
- self.quitFlag = False
- self.delegate = None
- self.clearRemovedCallbacks()
-
- def clearRemovedCallbacks(self):
- self.removedReadCallbacks = set()
- self.removedWriteCallbacks = set()
-
- def _slurpWakerData(self):
- self.wakeReceiver.recv(1024)
-
- def setDelegate(self, delegate):
- self.delegate = delegate
-
- def addReadCallback(self, socket, callback):
- self.readCallbacks[socket.fileno()] = callback
-
- def removeReadCallback(self, socket):
- del self.readCallbacks[socket.fileno()]
- self.removedReadCallbacks.add(socket.fileno())
-
- def addWriteCallback(self, socket, callback):
- self.writeCallbacks[socket.fileno()] = callback
-
- def removeWriteCallback(self, socket):
- del self.writeCallbacks[socket.fileno()]
- self.removedWriteCallbacks.add(socket.fileno())
-
- def wakeup(self):
- self.wakeSender.send("b")
-
- def callInThread(self, callback, errback, function, name, *args, **kwargs):
- self.threadPool.queueCall(callback, errback, function, name, *args, **kwargs)
-
- def _beginLoop(self):
- if self.delegate is not None and hasattr(self.delegate, "beginLoop"):
- self.delegate.beginLoop(self)
-
- def _endLoop(self):
- if self.delegate is not None and hasattr(self.delegate, "endLoop"):
- self.delegate.endLoop(self)
-
- def loop(self):
- global loop_ready
- database.set_thread()
- loop_ready.set()
- while not self.quitFlag:
- self._beginLoop()
- self.clearRemovedCallbacks()
- timeout = self.scheduler.nextTimeout()
- readfds = self.readCallbacks.keys()
- writefds = self.writeCallbacks.keys()
- try:
- readables, writeables, _ = select.select(readfds, writefds, [],
- timeout)
- except select.error, (err, detail):
- if err == errno.EINTR:
- logging.warning ("eventloop: %s", detail)
- else:
- raise
- if self.quitFlag:
- break
- self.urgentQueue.processIdles()
- for event in self.generateEvents(readables, writeables):
- event()
- if self.quitFlag:
- break
- self.urgentQueue.processIdles()
- if self.quitFlag:
- break
- self._endLoop()
-
- def generateEvents(self, readables, writeables):
- """Generator that creates the list of events that should be dealt with
- on this iteration of the event loop. This includes all socket
- read/write callbacks, timeouts and idle calls. "events" are
- implemented as functions that should be called with no arguments.
- """
-
- for callback in self.generateCallbacks(writeables,
- self.writeCallbacks, self.removedWriteCallbacks):
- yield callback
- for callback in self.generateCallbacks(readables,
- self.readCallbacks, self.removedReadCallbacks):
- yield callback
- while self.scheduler.hasPendingTimeout():
- yield self.scheduler.processNextTimeout
- while self.idleQueue.hasPendingIdle():
- yield self.idleQueue.processNextIdle
-
- def generateCallbacks(self, readyList, map, removed):
- for fd in readyList:
- try:
- function = map[fd]
- except KeyError:
- # this can happen the write callback removes the read callback
- # or vise versa
- pass
- else:
- if fd in removed:
- continue
- when = "While talking to the network"
- def callbackEvent():
- if not util.trapCall(when, function):
- del map[fd]
- yield callbackEvent
-
- _eventLoop = EventLoop()
-
- def setDelegate(delegate):
- _eventLoop.setDelegate(delegate)
-
- def addReadCallback(socket, callback):
- """Add a read callback. When socket is ready for reading, callback will
- be called. If there is already a read callback installed, it will be
- replaced.
- """
- _eventLoop.addReadCallback(socket, callback)
-
- def removeReadCallback(socket):
- """Remove a read callback. If there is not a read callback installed for
- socket, a KeyError will be thrown.
- """
- _eventLoop.removeReadCallback(socket)
-
- def addWriteCallback(socket, callback):
- """Add a write callback. When socket is ready for writing, callback will
- be called. If there is already a write callback installed, it will be
- replaced.
- """
- _eventLoop.addWriteCallback(socket, callback)
-
- def removeWriteCallback(socket):
- """Remove a write callback. If there is not a write callback installed for
- socket, a KeyError will be thrown.
- """
- _eventLoop.removeWriteCallback(socket)
-
- def stopHandlingSocket(socket):
- """Convience function to that removes both the read and write callback for
- a socket if they exist."""
- try:
- removeReadCallback(socket)
- except KeyError:
- pass
- try:
- removeWriteCallback(socket)
- except KeyError:
- pass
-
- def addTimeout(delay, function, name, args=None, kwargs=None):
- """Schedule a function to be called at some point in the future.
- Returns a DelayedCall object that can be used to cancel the call.
- """
-
- dc = _eventLoop.scheduler.addTimeout(delay, function, name, args, kwargs)
- return dc
-
- def addIdle(function, name, args=None, kwargs=None):
- """Schedule a function to be called when we get some spare time.
- Returns a DelayedCall object that can be used to cancel the call.
- """
-
- dc = _eventLoop.idleQueue.addIdle(function, name, args, kwargs)
- _eventLoop.wakeup()
- return dc
-
- def addUrgentCall(function, name, args=None, kwargs=None):
- """Schedule a function to be called as soon as possible. This method
- should be used for things like GUI actions, where the user is waiting on
- us.
- """
-
- dc = _eventLoop.urgentQueue.addIdle(function, name, args, kwargs)
- _eventLoop.wakeup()
- return dc
-
- def callInThread(callback, errback, function, name, *args, **kwargs):
- """Schedule a function to be called in a separate thread. Do not
- put code that accesses the database or the UI here!
- """
- _eventLoop.callInThread(callback, errback, function, name, *args, **kwargs)
-
- lt = None
-
- profile_file = None
-
- loop_ready = threading.Event()
-
- def startup():
- threadPoolInit()
-
- def profile_startup():
- import profile
- def start_loop():
- _eventLoop.loop()
- profile.runctx ('_eventLoop.loop()', globals(), locals(), profile_file + ".event_loop")
-
- global lt
- global loop_ready
- if profile_file:
- lt = threading.Thread(target=profile_startup, name="Event Loop")
- else:
- lt = threading.Thread(target=_eventLoop.loop, name="Event Loop")
- lt.setDaemon(False)
- lt.start()
- loop_ready.wait()
-
-
- def join():
- global lt
- if lt is not None:
- lt.join()
-
- def quit():
- threadPoolQuit()
- _eventLoop.quitFlag = True
- _eventLoop.wakeup()
-
- def resetEventLoop():
- _eventLoop = EventLoop()
-
- def threadPoolQuit():
- _eventLoop.threadPool.closeThreads()
-
- def threadPoolInit():
- _eventLoop.threadPool.initThreads()
-
- def asIdle(func):
- """Decorator to make a methods run as an idle function
-
- Suppose you have 2 methods, foo and bar
-
- @asIdle
- def foo():
- # database operations
-
- def bar():
- # same database operations as foo
-
- Then calling foo() is exactly the same as calling addIdle(bar).
- """
-
- def queuer(*args, **kwargs):
- return addIdle(func, "%s() (using asIdle)" % func.__name__, args=args, kwargs=kwargs)
- return queuer
-
- def asUrgent(func):
- """Like asIdle, but uses addUrgentCall() instead of addIdle()."""
-
- def queuer(*args, **kwargs):
- return addUrgentCall(func, "%s() (using asUrgent)" % func.__name__, args=args, kwargs=kwargs)
- return queuer
-
- def checkHeapSize():
- logging.info ("Heap size: %d.", len(_eventLoop.scheduler.heap))
- addTimeout(5, checkHeapSize, "Check Heap Size")
- #addTimeout(5, checkHeapSize, "Check Heap Size")
-